1 /** 2 * Copyright 2014 Netflix, Inc. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 package rx.observables; 17 18 import java.util.Iterator; 19 import java.util.NoSuchElementException; 20 import java.util.concurrent.CountDownLatch; 21 import java.util.concurrent.Future; 22 import java.util.concurrent.atomic.AtomicReference; 23 24 import rx.Observable; 25 import rx.Subscriber; 26 import rx.Subscription; 27 import rx.functions.Action1; 28 import rx.functions.Func1; 29 import rx.internal.operators.BlockingOperatorLatest; 30 import rx.internal.operators.BlockingOperatorMostRecent; 31 import rx.internal.operators.BlockingOperatorNext; 32 import rx.internal.operators.BlockingOperatorToFuture; 33 import rx.internal.operators.BlockingOperatorToIterator; 34 import rx.internal.util.UtilityFunctions; 35 36 /** 37 * {@code BlockingObservable} is a variety of {@link Observable} that provides blocking operators. It can be 38 * useful for testing and demo purposes, but is generally inappropriate for production applications (if you 39 * think you need to use a {@code BlockingObservable} this is usually a sign that you should rethink your 40 * design). 41 * <p> 42 * You construct a {@code BlockingObservable} from an {@code Observable} with {@link #from(Observable)} or 43 * {@link Observable#toBlocking()}. 44 * <p> 45 * The documentation for this interface makes use of a form of marble diagram that has been modified to 46 * illustrate blocking operators. The following legend explains these marble diagrams: 47 * <p> 48 * <img width="640" height="301" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.legend.png" alt=""> 49 * 50 * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators">RxJava wiki: Blocking 51 * Observable Operators</a> 52 * @param <T> 53 * the type of item emitted by the {@code BlockingObservable} 54 */ 55 public final class BlockingObservable<T> { 56 57 private final Observable<? extends T> o; 58 59 private BlockingObservable(Observable<? extends T> o) { 60 this.o = o; 61 } 62 63 /** 64 * Converts an {@link Observable} into a {@code BlockingObservable}. 65 * 66 * @param o 67 * the {@link Observable} you want to convert 68 * @return a {@code BlockingObservable} version of {@code o} 69 */ 70 public static <T> BlockingObservable<T> from(final Observable<? extends T> o) { 71 return new BlockingObservable<T>(o); 72 } 73 74 /** 75 * Invokes a method on each item emitted by this {@code BlockingObservable} and blocks until the Observable 76 * completes. 77 * <p> 78 * <em>Note:</em> This will block even if the underlying Observable is asynchronous. 79 * <p> 80 * <img width="640" height="330" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.forEach.png" alt=""> 81 * <p> 82 * This is similar to {@link Observable#subscribe(Subscriber)}, but it blocks. Because it blocks it does not 83 * need the {@link Subscriber#onCompleted()} or {@link Subscriber#onError(Throwable)} methods. If the 84 * underlying Observable terminates with an error, rather than calling {@code onError}, this method will 85 * throw an exception. 86 * 87 * @param onNext 88 * the {@link Action1} to invoke for each item emitted by the {@code BlockingObservable} 89 * @throws RuntimeException 90 * if an error occurs 91 * @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX documentation: Subscribe</a> 92 */ 93 public void forEach(final Action1<? super T> onNext) { 94 final CountDownLatch latch = new CountDownLatch(1); 95 final AtomicReference<Throwable> exceptionFromOnError = new AtomicReference<Throwable>(); 96 97 /* 98 * Use 'subscribe' instead of 'unsafeSubscribe' for Rx contract behavior 99 * as this is the final subscribe in the chain. 100 */ 101 Subscription subscription = o.subscribe(new Subscriber<T>() { 102 @Override 103 public void onCompleted() { 104 latch.countDown(); 105 } 106 107 @Override 108 public void onError(Throwable e) { 109 /* 110 * If we receive an onError event we set the reference on the 111 * outer thread so we can git it and throw after the 112 * latch.await(). 113 * 114 * We do this instead of throwing directly since this may be on 115 * a different thread and the latch is still waiting. 116 */ 117 exceptionFromOnError.set(e); 118 latch.countDown(); 119 } 120 121 @Override 122 public void onNext(T args) { 123 onNext.call(args); 124 } 125 }); 126 // block until the subscription completes and then return 127 try { 128 latch.await(); 129 } catch (InterruptedException e) { 130 subscription.unsubscribe(); 131 // set the interrupted flag again so callers can still get it 132 // for more information see https://github.com/ReactiveX/RxJava/pull/147#issuecomment-13624780 133 Thread.currentThread().interrupt(); 134 // using Runtime so it is not checked 135 throw new RuntimeException("Interrupted while waiting for subscription to complete.", e); 136 } 137 138 if (exceptionFromOnError.get() != null) { 139 if (exceptionFromOnError.get() instanceof RuntimeException) { 140 throw (RuntimeException) exceptionFromOnError.get(); 141 } else { 142 throw new RuntimeException(exceptionFromOnError.get()); 143 } 144 } 145 } 146 147 /** 148 * Returns an {@link Iterator} that iterates over all items emitted by this {@code BlockingObservable}. 149 * <p> 150 * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.getIterator.png" alt=""> 151 * 152 * @return an {@link Iterator} that can iterate over the items emitted by this {@code BlockingObservable} 153 * @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX documentation: To</a> 154 */ 155 public Iterator<T> getIterator() { 156 return BlockingOperatorToIterator.toIterator(o); 157 } 158 159 /** 160 * Returns the first item emitted by this {@code BlockingObservable}, or throws 161 * {@code NoSuchElementException} if it emits no items. 162 * 163 * @return the first item emitted by this {@code BlockingObservable} 164 * @throws NoSuchElementException 165 * if this {@code BlockingObservable} emits no items 166 * @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a> 167 */ 168 public T first() { 169 return blockForSingle(o.first()); 170 } 171 172 /** 173 * Returns the first item emitted by this {@code BlockingObservable} that matches a predicate, or throws 174 * {@code NoSuchElementException} if it emits no such item. 175 * 176 * @param predicate 177 * a predicate function to evaluate items emitted by this {@code BlockingObservable} 178 * @return the first item emitted by this {@code BlockingObservable} that matches the predicate 179 * @throws NoSuchElementException 180 * if this {@code BlockingObservable} emits no such items 181 * @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a> 182 */ 183 public T first(Func1<? super T, Boolean> predicate) { 184 return blockForSingle(o.first(predicate)); 185 } 186 187 /** 188 * Returns the first item emitted by this {@code BlockingObservable}, or a default value if it emits no 189 * items. 190 * 191 * @param defaultValue 192 * a default value to return if this {@code BlockingObservable} emits no items 193 * @return the first item emitted by this {@code BlockingObservable}, or the default value if it emits no 194 * items 195 * @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a> 196 */ 197 public T firstOrDefault(T defaultValue) { 198 return blockForSingle(o.map(UtilityFunctions.<T>identity()).firstOrDefault(defaultValue)); 199 } 200 201 /** 202 * Returns the first item emitted by this {@code BlockingObservable} that matches a predicate, or a default 203 * value if it emits no such items. 204 * 205 * @param defaultValue 206 * a default value to return if this {@code BlockingObservable} emits no matching items 207 * @param predicate 208 * a predicate function to evaluate items emitted by this {@code BlockingObservable} 209 * @return the first item emitted by this {@code BlockingObservable} that matches the predicate, or the 210 * default value if this {@code BlockingObservable} emits no matching items 211 * @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a> 212 */ 213 public T firstOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) { 214 return blockForSingle(o.filter(predicate).map(UtilityFunctions.<T>identity()).firstOrDefault(defaultValue)); 215 } 216 217 /** 218 * Returns the last item emitted by this {@code BlockingObservable}, or throws 219 * {@code NoSuchElementException} if this {@code BlockingObservable} emits no items. 220 * <p> 221 * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.last.png" alt=""> 222 * 223 * @return the last item emitted by this {@code BlockingObservable} 224 * @throws NoSuchElementException 225 * if this {@code BlockingObservable} emits no items 226 * @see <a href="http://reactivex.io/documentation/operators/last.html">ReactiveX documentation: Last</a> 227 */ 228 public T last() { 229 return blockForSingle(o.last()); 230 } 231 232 /** 233 * Returns the last item emitted by this {@code BlockingObservable} that matches a predicate, or throws 234 * {@code NoSuchElementException} if it emits no such items. 235 * <p> 236 * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.last.p.png" alt=""> 237 * 238 * @param predicate 239 * a predicate function to evaluate items emitted by the {@code BlockingObservable} 240 * @return the last item emitted by the {@code BlockingObservable} that matches the predicate 241 * @throws NoSuchElementException 242 * if this {@code BlockingObservable} emits no items 243 * @see <a href="http://reactivex.io/documentation/operators/last.html">ReactiveX documentation: Last</a> 244 */ 245 public T last(final Func1<? super T, Boolean> predicate) { 246 return blockForSingle(o.last(predicate)); 247 } 248 249 /** 250 * Returns the last item emitted by this {@code BlockingObservable}, or a default value if it emits no 251 * items. 252 * <p> 253 * <img width="640" height="310" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.lastOrDefault.png" alt=""> 254 * 255 * @param defaultValue 256 * a default value to return if this {@code BlockingObservable} emits no items 257 * @return the last item emitted by the {@code BlockingObservable}, or the default value if it emits no 258 * items 259 * @see <a href="http://reactivex.io/documentation/operators/last.html">ReactiveX documentation: Last</a> 260 */ 261 public T lastOrDefault(T defaultValue) { 262 return blockForSingle(o.map(UtilityFunctions.<T>identity()).lastOrDefault(defaultValue)); 263 } 264 265 /** 266 * Returns the last item emitted by this {@code BlockingObservable} that matches a predicate, or a default 267 * value if it emits no such items. 268 * <p> 269 * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.lastOrDefault.p.png" alt=""> 270 * 271 * @param defaultValue 272 * a default value to return if this {@code BlockingObservable} emits no matching items 273 * @param predicate 274 * a predicate function to evaluate items emitted by this {@code BlockingObservable} 275 * @return the last item emitted by this {@code BlockingObservable} that matches the predicate, or the 276 * default value if it emits no matching items 277 * @see <a href="http://reactivex.io/documentation/operators/last.html">ReactiveX documentation: Last</a> 278 */ 279 public T lastOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) { 280 return blockForSingle(o.filter(predicate).map(UtilityFunctions.<T>identity()).lastOrDefault(defaultValue)); 281 } 282 283 /** 284 * Returns an {@link Iterable} that always returns the item most recently emitted by this 285 * {@code BlockingObservable}. 286 * <p> 287 * <img width="640" height="490" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.mostRecent.png" alt=""> 288 * 289 * @param initialValue 290 * the initial value that the {@link Iterable} sequence will yield if this 291 * {@code BlockingObservable} has not yet emitted an item 292 * @return an {@link Iterable} that on each iteration returns the item that this {@code BlockingObservable} 293 * has most recently emitted 294 * @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a> 295 */ 296 public Iterable<T> mostRecent(T initialValue) { 297 return BlockingOperatorMostRecent.mostRecent(o, initialValue); 298 } 299 300 /** 301 * Returns an {@link Iterable} that blocks until this {@code BlockingObservable} emits another item, then 302 * returns that item. 303 * <p> 304 * <img width="640" height="490" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.next.png" alt=""> 305 * 306 * @return an {@link Iterable} that blocks upon each iteration until this {@code BlockingObservable} emits 307 * a new item, whereupon the Iterable returns that item 308 * @see <a href="http://reactivex.io/documentation/operators/takelast.html">ReactiveX documentation: TakeLast</a> 309 */ 310 public Iterable<T> next() { 311 return BlockingOperatorNext.next(o); 312 } 313 314 /** 315 * Returns an {@link Iterable} that returns the latest item emitted by this {@code BlockingObservable}, 316 * waiting if necessary for one to become available. 317 * <p> 318 * If this {@code BlockingObservable} produces items faster than {@code Iterator.next} takes them, 319 * {@code onNext} events might be skipped, but {@code onError} or {@code onCompleted} events are not. 320 * <p> 321 * Note also that an {@code onNext} directly followed by {@code onCompleted} might hide the {@code onNext} 322 * event. 323 * 324 * @return an Iterable that always returns the latest item emitted by this {@code BlockingObservable} 325 * @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a> 326 */ 327 public Iterable<T> latest() { 328 return BlockingOperatorLatest.latest(o); 329 } 330 331 /** 332 * If this {@code BlockingObservable} completes after emitting a single item, return that item, otherwise 333 * throw a {@code NoSuchElementException}. 334 * <p> 335 * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.single.png" alt=""> 336 * 337 * @return the single item emitted by this {@code BlockingObservable} 338 * @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a> 339 */ 340 public T single() { 341 return blockForSingle(o.single()); 342 } 343 344 /** 345 * If this {@code BlockingObservable} completes after emitting a single item that matches a given predicate, 346 * return that item, otherwise throw a {@code NoSuchElementException}. 347 * <p> 348 * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.single.p.png" alt=""> 349 * 350 * @param predicate 351 * a predicate function to evaluate items emitted by this {@link BlockingObservable} 352 * @return the single item emitted by this {@code BlockingObservable} that matches the predicate 353 * @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a> 354 */ 355 public T single(Func1<? super T, Boolean> predicate) { 356 return blockForSingle(o.single(predicate)); 357 } 358 359 /** 360 * If this {@code BlockingObservable} completes after emitting a single item, return that item; if it emits 361 * more than one item, throw an {@code IllegalArgumentException}; if it emits no items, return a default 362 * value. 363 * <p> 364 * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.singleOrDefault.png" alt=""> 365 * 366 * @param defaultValue 367 * a default value to return if this {@code BlockingObservable} emits no items 368 * @return the single item emitted by this {@code BlockingObservable}, or the default value if it emits no 369 * items 370 * @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a> 371 */ 372 public T singleOrDefault(T defaultValue) { 373 return blockForSingle(o.map(UtilityFunctions.<T>identity()).singleOrDefault(defaultValue)); 374 } 375 376 /** 377 * If this {@code BlockingObservable} completes after emitting a single item that matches a predicate, 378 * return that item; if it emits more than one such item, throw an {@code IllegalArgumentException}; if it 379 * emits no items, return a default value. 380 * <p> 381 * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.singleOrDefault.p.png" alt=""> 382 * 383 * @param defaultValue 384 * a default value to return if this {@code BlockingObservable} emits no matching items 385 * @param predicate 386 * a predicate function to evaluate items emitted by this {@code BlockingObservable} 387 * @return the single item emitted by the {@code BlockingObservable} that matches the predicate, or the 388 * default value if no such items are emitted 389 * @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a> 390 */ 391 public T singleOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) { 392 return blockForSingle(o.filter(predicate).map(UtilityFunctions.<T>identity()).singleOrDefault(defaultValue)); 393 } 394 395 /** 396 * Returns a {@link Future} representing the single value emitted by this {@code BlockingObservable}. 397 * <p> 398 * If {@link BlockingObservable} emits more than one item, {@link java.util.concurrent.Future} will receive an 399 * {@link java.lang.IllegalArgumentException}. If {@link BlockingObservable} is empty, {@link java.util.concurrent.Future} 400 * will receive an {@link java.util.NoSuchElementException}. 401 * <p> 402 * If the {@code BlockingObservable} may emit more than one item, use {@code Observable.toList().toBlocking().toFuture()}. 403 * <p> 404 * <img width="640" height="395" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.toFuture.png" alt=""> 405 * 406 * @return a {@link Future} that expects a single item to be emitted by this {@code BlockingObservable} 407 * @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX documentation: To</a> 408 */ 409 public Future<T> toFuture() { 410 return BlockingOperatorToFuture.toFuture(o); 411 } 412 413 /** 414 * Converts this {@code BlockingObservable} into an {@link Iterable}. 415 * <p> 416 * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.toIterable.png" alt=""> 417 * 418 * @return an {@link Iterable} version of this {@code BlockingObservable} 419 * @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX documentation: To</a> 420 */ 421 public Iterable<T> toIterable() { 422 return new Iterable<T>() { 423 @Override 424 public Iterator<T> iterator() { 425 return getIterator(); 426 } 427 }; 428 } 429 430 /** 431 * Helper method which handles the actual blocking for a single response. 432 * <p> 433 * If the {@link Observable} errors, it will be thrown right away. 434 * 435 * @return the actual item 436 */ 437 private T blockForSingle(final Observable<? extends T> observable) { 438 final AtomicReference<T> returnItem = new AtomicReference<T>(); 439 final AtomicReference<Throwable> returnException = new AtomicReference<Throwable>(); 440 final CountDownLatch latch = new CountDownLatch(1); 441 442 Subscription subscription = observable.subscribe(new Subscriber<T>() { 443 @Override 444 public void onCompleted() { 445 latch.countDown(); 446 } 447 448 @Override 449 public void onError(final Throwable e) { 450 returnException.set(e); 451 latch.countDown(); 452 } 453 454 @Override 455 public void onNext(final T item) { 456 returnItem.set(item); 457 } 458 }); 459 460 try { 461 latch.await(); 462 } catch (InterruptedException e) { 463 subscription.unsubscribe(); 464 Thread.currentThread().interrupt(); 465 throw new RuntimeException("Interrupted while waiting for subscription to complete.", e); 466 } 467 468 if (returnException.get() != null) { 469 if (returnException.get() instanceof RuntimeException) { 470 throw (RuntimeException) returnException.get(); 471 } else { 472 throw new RuntimeException(returnException.get()); 473 } 474 } 475 476 return returnItem.get(); 477 } 478 }